{
 "cells": [
  {
   "cell_type": "code",
   "execution_count": 1,
   "metadata": {},
   "outputs": [],
   "source": [
    "# create entry points to spark\n",
    "try:\n",
    "    sc.stop()\n",
    "except:\n",
    "    pass\n",
    "from pyspark import SparkContext, SparkConf\n",
    "from pyspark.sql import SparkSession\n",
    "sc=SparkContext()\n",
    "spark = SparkSession(sparkContext=sc)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Merge and split columns\n",
    "\n",
    "Sometimes we need to merge multiple columns in a Dataframe into one column, or split a column into multiple columns. We can easily achieve this by converting a DataFrame to RDD, applying map functions to manipulate elements, and then converting the RDD back to a DataFrame.\n",
    "\n",
    "### Example data frame"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 2,
   "metadata": {},
   "outputs": [],
   "source": [
    "mtcars = spark.read.csv(path='../../data/mtcars.csv',\n",
    "                        sep=',',\n",
    "                        encoding='UTF-8',\n",
    "                        comment=None,\n",
    "                        header=True, \n",
    "                        inferSchema=True)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 3,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+\n",
      "|              _c0| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|\n",
      "+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+\n",
      "|        Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|\n",
      "|    Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|\n",
      "|       Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|\n",
      "|   Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|\n",
      "|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|\n",
      "+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "mtcars.show(n=5)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 4,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+\n",
      "|            model| mpg|cyl| disp| hp|drat|   wt| qsec| vs| am|gear|carb|\n",
      "+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+\n",
      "|        Mazda RX4|21.0|  6|160.0|110| 3.9| 2.62|16.46|  0|  1|   4|   4|\n",
      "|    Mazda RX4 Wag|21.0|  6|160.0|110| 3.9|2.875|17.02|  0|  1|   4|   4|\n",
      "|       Datsun 710|22.8|  4|108.0| 93|3.85| 2.32|18.61|  1|  1|   4|   1|\n",
      "|   Hornet 4 Drive|21.4|  6|258.0|110|3.08|3.215|19.44|  1|  0|   3|   1|\n",
      "|Hornet Sportabout|18.7|  8|360.0|175|3.15| 3.44|17.02|  0|  0|   3|   2|\n",
      "+-----------------+----+---+-----+---+----+-----+-----+---+---+----+----+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "# adjust first column name\n",
    "colnames = mtcars.columns\n",
    "colnames[0] = 'model'\n",
    "mtcars = mtcars.rdd.toDF(colnames)\n",
    "mtcars.show(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "### Merge multiple columns\n",
    "\n",
    "We convert DataFrame to RDD and then apply the **map** function to merge values and convert \n",
    "elements to **Row** objects."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 5,
   "metadata": {},
   "outputs": [
    {
     "data": {
      "text/plain": [
       "[Row(model='Mazda RX4', values=(21.0, 6, 160.0, 110, 3.9, 2.62, 16.46, 0, 1, 4, 4)),\n",
       " Row(model='Mazda RX4 Wag', values=(21.0, 6, 160.0, 110, 3.9, 2.875, 17.02, 0, 1, 4, 4)),\n",
       " Row(model='Datsun 710', values=(22.8, 4, 108.0, 93, 3.85, 2.32, 18.61, 1, 1, 4, 1)),\n",
       " Row(model='Hornet 4 Drive', values=(21.4, 6, 258.0, 110, 3.08, 3.215, 19.44, 1, 0, 3, 1)),\n",
       " Row(model='Hornet Sportabout', values=(18.7, 8, 360.0, 175, 3.15, 3.44, 17.02, 0, 0, 3, 2))]"
      ]
     },
     "execution_count": 5,
     "metadata": {},
     "output_type": "execute_result"
    }
   ],
   "source": [
    "from pyspark.sql import Row\n",
    "mtcars_rdd = mtcars.rdd.map(lambda x: Row(model=x[0], values=x[1:]))\n",
    "mtcars_rdd.take(5)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "Then we create a new DataFrame from the obtained RDD."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 6,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----------------+-----------------------------------------------------+\n",
      "|model            |values                                               |\n",
      "+-----------------+-----------------------------------------------------+\n",
      "|Mazda RX4        |[21.0, 6, 160.0, 110, 3.9, 2.62, 16.46, 0, 1, 4, 4]  |\n",
      "|Mazda RX4 Wag    |[21.0, 6, 160.0, 110, 3.9, 2.875, 17.02, 0, 1, 4, 4] |\n",
      "|Datsun 710       |[22.8, 4, 108.0, 93, 3.85, 2.32, 18.61, 1, 1, 4, 1]  |\n",
      "|Hornet 4 Drive   |[21.4, 6, 258.0, 110, 3.08, 3.215, 19.44, 1, 0, 3, 1]|\n",
      "|Hornet Sportabout|[18.7, 8, 360.0, 175, 3.15, 3.44, 17.02, 0, 0, 3, 2] |\n",
      "+-----------------+-----------------------------------------------------+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "mtcars_df = spark.createDataFrame(mtcars_rdd)\n",
    "mtcars_df.show(5, truncate=False)"
   ]
  },
  {
   "cell_type": "markdown",
   "metadata": {},
   "source": [
    "## Split one column\n",
    "\n",
    "We use the above DataFrame as our example data. Again, we need to convert the DataFrame to an RDD to achieve our goal.\n",
    "\n",
    "Let's split the **values** column into two columns: x1 and x2. The first 4 values will be in column **x1** and the remaining values will be in column **x2**."
   ]
  },
  {
   "cell_type": "code",
   "execution_count": 7,
   "metadata": {},
   "outputs": [
    {
     "name": "stdout",
     "output_type": "stream",
     "text": [
      "+-----------------+---------------------------+--------------------------+\n",
      "|model            |x1                         |x2                        |\n",
      "+-----------------+---------------------------+--------------------------+\n",
      "|Mazda RX4        |[21.0, 6, 160.0, 110, 3.9] |[2.62, 16.46, 0, 1, 4, 4] |\n",
      "|Mazda RX4 Wag    |[21.0, 6, 160.0, 110, 3.9] |[2.875, 17.02, 0, 1, 4, 4]|\n",
      "|Datsun 710       |[22.8, 4, 108.0, 93, 3.85] |[2.32, 18.61, 1, 1, 4, 1] |\n",
      "|Hornet 4 Drive   |[21.4, 6, 258.0, 110, 3.08]|[3.215, 19.44, 1, 0, 3, 1]|\n",
      "|Hornet Sportabout|[18.7, 8, 360.0, 175, 3.15]|[3.44, 17.02, 0, 0, 3, 2] |\n",
      "+-----------------+---------------------------+--------------------------+\n",
      "only showing top 5 rows\n",
      "\n"
     ]
    }
   ],
   "source": [
    "mtcars_rdd_2 = mtcars_df.rdd.map(lambda x: Row(model=x[0], x1=x[1][:5], x2=x[1][5:]))\n",
    "# convert RDD back to DataFrame\n",
    "mtcars_df_2 = spark.createDataFrame(mtcars_rdd_2)\n",
    "mtcars_df_2.show(5, truncate=False)"
   ]
  },
  {
   "cell_type": "code",
   "execution_count": null,
   "metadata": {
    "collapsed": true
   },
   "outputs": [],
   "source": []
  }
 ],
 "metadata": {
  "kernelspec": {
   "display_name": "Python 3",
   "language": "python",
   "name": "python3"
  },
  "language_info": {
   "codemirror_mode": {
    "name": "ipython",
    "version": 3
   },
   "file_extension": ".py",
   "mimetype": "text/x-python",
   "name": "python",
   "nbconvert_exporter": "python",
   "pygments_lexer": "ipython3",
   "version": "3.6.5"
  }
 },
 "nbformat": 4,
 "nbformat_minor": 2
}
